[備忘録] Kinesis FirehoseのLambdaによるデータ変換について
こんにちは、CX事業本部の夏目です。
案件でFirehoseのLambdaによるデータ変換を使ったのですが、FirehoseのドキュメントがLambdaのBluePrint(設計図)を使ってねぐらいの書き方で、どんなeventが来てどんなresponseを返さないといけないのか詳しく書いてなかったので、備忘録として残しておきます。
Lambdaの設定について
- LambdaにつけるIAM Roleでは特に必要と言えるものはない
- タイムアウトは5分までしか対応していない (これはドキュメントに記載されている)
Lambdaに渡されるEventについて
{ "invocationId": "b384e3cf-8ae0-47ac-8e2a-8e4a9e59941e", "sourceKinesisStreamArn": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/firehose_ln", "deliveryStreamArn": "arn:aws:firehose:ap-northeast-1:000000000000:deliverystream/firehose_ln", "region": "ap-northeast-1", "records": [ { "recordId": "49607016322531838770965472870713485392692964656771235842000000", "approximateArrivalTimestamp": 1589518484529, "data": "eyJhIjogImM3NjNlM2I5LWI1NDQtNDg4NS04YWM1LWQ3ZDg4MjJjNGNmYyIsICJiIjogImQ1NjNiYWU0LTYwZTItNDhiOC1hZTQ0LTdkYjU2ZTZlNTZkNSIsICJjIjogIjljZjNkYmE3LTY2NTEtNGE2MC1iOGJhLThkNmFhMGYwMjNkMiJ9", "kinesisRecordMetadata": { "sequenceNumber": "49607016322531838770965472870713485392692964656771235842", "subsequenceNumber": 0, "partitionKey": "d39ae9bc-27ba-4208-92c8-98c607b84c46", "shardId": "shardId-000000000000", "approximateArrivalTimestamp": 1589518484529 } }, { "recordId": "49607016322531838770965472870714694318512579354665418754000000", "approximateArrivalTimestamp": 1589518485244, "data": "eyJhIjogIjUxNDJhMGUwLTdiMWYtNGMxNS1iZjBiLThhMjU2ZmQzZjkyYyIsICJiIjogImNkZTVmNWRmLWUzMGMtNGE2Yi1iNjJjLWZiNmY3Zjk1NTYzNSIsICJjIjogIjljN2YyMWRlLWU2MWItNGJhZi05YjQ0LTg2MmE0YmVhZTVlMiJ9", "kinesisRecordMetadata": { "sequenceNumber": "49607016322531838770965472870714694318512579354665418754", "subsequenceNumber": 0, "partitionKey": "75c67f74-41be-4be7-8d64-868712be1773", "shardId": "shardId-000000000000", "approximateArrivalTimestamp": 1589518485244 } } ] }
- 基本的に使用するのは、
records
の中身- 特に
recordId
はresponseで使用する
- 特に
- recordsの各要素の
data
は各データをBase64エンコードしたもの
Lambdaが返す値について
{ "records": [ { "recordId": "49607016322531838770965472870713485392692964656771235842000000", "result": "Ok", "data": "eyJhIjogImM3NjNlM2I5LWI1NDQtNDg4NS04YWM1LWQ3ZDg4MjJjNGNmYyIsICJiIjogImQ1NjNiYWU0LTYwZTItNDhiOC1hZTQ0LTdkYjU2ZTZlNTZkNSIsICJjIjogIjljZjNkYmE3LTY2NTEtNGE2MC1iOGJhLThkNmFhMGYwMjNkMiJ9" }, { "recordId": "49607016322531838770965472870714694318512579354665418754000000", "result": "Ok", "data": "eyJhIjogIjUxNDJhMGUwLTdiMWYtNGMxNS1iZjBiLThhMjU2ZmQzZjkyYyIsICJiIjogImNkZTVmNWRmLWUzMGMtNGE2Yi1iNjJjLWZiNmY3Zjk1NTYzNSIsICJjIjogIjljN2YyMWRlLWU2MWItNGJhZi05YjQ0LTg2MmE0YmVhZTVlMiJ9" } ] }
- Lambdaのresponseは上記形式になっている必要がある
- 自分は
records
の配列だけを返していて、形式が異なると怒られた
- 自分は
records
の各要素については、ドキュメントに詳しく記載されているdata
はBase64でエンコードしたもの
Firehoseの設定について (CloudFormation)
# Lambdaによるデータ変換において必要そうな要素のみ抜粋 DeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: ExtendedS3DestinationConfiguration: ProcessingConfiguration: Enabled: true Processors: - Type: Lambda Parameters: - ParameterName: LambdaArn ParameterValue: !Ref ConvertLambda.Alias - ParameterName: NumberOfRetries ParameterValue: "3" - ParameterName: RoleArn ParameterValue: !GetAtt InvokeLambdaRole.Arn - ParameterName: BufferSizeInMBs ParameterValue: "1" - ParameterName: BufferIntervalInSeconds ParameterValue: "60" S3BackupMode: Enabled S3BackupConfiguration: RoleARN: !GetAtt DeliveryRole.Arn BucketARN: arn:aws:s3:::delivery-bucket Prefix: Backup/ ErrorOutputPrefix: "!{firehose:error-output-type}/" BufferingHints: SizeInMBs: 5 IntervalInSeconds: 300 CompressionFormat: UNCOMPRESSED
- Lambdaでデータ変換を行う場合、各
DestinationConfiguration
の中のProcessingConfiguration
で設定を記述する- 上記例では
ExtendedS3DestinationConfiguration
を使用している
- 上記例では
- 変換で呼び出すLambdaなどは、
Processors[].Parameters
のパラメータとして指定する RoleArn
では変換に使用するLambdaのInvoke権限が必要- 変更に失敗したときのために、
S3Backup
の設定をいれておくと良さそう